# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Continuous fields description and containers.
* :class:`~hysop.fields.continuous.FieldContainerI`
* :class:`~hysop.fields.continuous.ScalarField`
* :class:`~hysop.fields.continuous.VectorField`
* :class:`~hysop.fields.continuous.TensorField`
"""
import textwrap
import sympy as sm
import numpy as np
from abc import ABCMeta, abstractmethod
from hysop.constants import (
HYSOP_REAL,
HYSOP_BOOL,
BoundaryCondition,
BoundaryConditionConfig,
DirectionLabels,
)
from hysop.tools.decorators import debug
from hysop.tools.htypes import check_instance, first_not_None, to_tuple
from hysop.tools.warning import HysopWarning
from hysop.tools.handle import TaggedObject
from hysop.tools.numpywrappers import npw
from hysop.domain.domain import Domain
from hysop.domain.box import BoxBoundaryCondition
from hysop.topology.topology import Topology, TopologyState
from hysop.tools.sympy_utils import (
nabla,
partial,
subscript,
subscripts,
exponent,
exponents,
xsymbol,
)
from hysop.symbolic import SpaceSymbol
from hysop.tools.interface import (
NamedObjectI,
SymbolContainerI,
NamedScalarContainerI,
NamedTensorContainerI,
)
[docs]
class FieldContainerI(TaggedObject):
"""Common abstract interface for scalar and tensor-like fields."""
[docs]
@debug
def __new__(
cls, domain, name=None, nb_components=None, shape=None, is_vector=None, **kwds
):
"""
Create a FieldContainer on a specific domain.
domain : domain.Domain
Physical domain where this field is defined.
kwds: dict
Keywords arguments for base class.
Notes
-----
This can also be used to instantiate a ScalarField, VectorField or TensorField
depending on nb_components, shape and is_vector.
"""
if not issubclass(cls, (ScalarField, VectorField, TensorField)):
check_instance(name, str, allow_none=False)
if shape is not None:
return TensorField(domain=domain, name=name, shape=shape, **kwds)
elif (is_vector is True) or (
(nb_components is not None) and (nb_components > 1)
):
nb_components = first_not_None(nb_components, domain.dim)
assert (is_vector is not True) or (nb_components == domain.dim)
return VectorField(
domain=domain, name=name, nb_components=nb_components, **kwds
)
else:
return ScalarField(domain=domain, name=name, **kwds)
check_instance(domain, Domain)
obj = super().__new__(cls, **kwds)
obj._domain = domain
obj._dim = int(domain.dim)
return obj
@debug
def __init__(
self, domain, name=None, nb_components=None, shape=None, is_vector=None, **kwds
):
super().__init__(**kwds)
@property
def is_scalar(self):
return not self.is_tensor
[docs]
@abstractmethod
def field_like(self, name, **kwds):
"""Create a ScalarField like this object, possibly altered."""
pass
[docs]
@abstractmethod
def tmp_like(self, name, **kwds):
"""Create a temporary field like self, possibly altered."""
pass
[docs]
@abstractmethod
def fields(self):
"""Return all unique scalar fields contained in this field container."""
pass
[docs]
@abstractmethod
def nb_components(self):
"""
Total number of components of this field container, exluding None entries,
but including duplicate fields.
"""
pass
[docs]
@abstractmethod
def discretize(self, topology, topology_state=None):
"""
Discretization of the field on a given topology.
Parameters
----------
topology : :class:`hysop.mpi.core.topology.Topology`
The topology on which to discretize this ScalarField.
state: :class:`hysop.mpi.core.topology.TopologyState`, optional
The topology state on which to discretize this ScalarField.
"""
pass
[docs]
@classmethod
def from_sympy_expressions(
cls,
name,
exprs,
space_symbols,
scalar_name_prefix=None,
scalar_pretty_name_prefix=None,
pretty_name=None,
**kwds,
):
"""
Create a field wich has the same shape as exprs, with optional names.
Expressions should be of kind sympy.Expr and are converted to FieldExpression: this
means they all have to contain at least one FieldExpression.
Note that field.symbol is always a SymbolicField which is a FieldExpression.
FieldExpression make sure boundary conditions match between fields for derivatives
and integrations.
"""
if isinstance(exprs, sm.Expr):
raise NotImplementedError("Call self.from_sympy_expression instead.")
check_instance(exprs, npw.ndarray, values=sm.Expr)
check_instance(name, str)
check_instance(pretty_name, str, allow_none=True)
check_instance(scalar_name_prefix, str, allow_none=True)
check_instance(scalar_pretty_name_prefix, str, allow_none=True)
fields = npw.empty(shape=exprs.shape, dtype=object)
fields[...] = None
for idx in npw.ndindex(*exprs.shape):
if scalar_name_prefix is not None:
sname = TensorField.default_name_formatter(scalar_name_prefix, idx)
if scalar_pretty_name_prefix is not None:
spname = TensorField.default_pretty_name_formatter(
scalar_pretty_name_prefix, idx
)
else:
spname = TensorField.default_pretty_name_formatter(
scalar_name_prefix, idx
)
else:
# names will be autogenerated from sympy expression
sname = None
spname = None
fields[idx] = cls.from_sympy_expression(
expr=exprs[idx],
space_symbols=space_symbols,
name=sname,
pretty_name=spname,
**kwds,
)
return TensorField.from_field_array(
name=name, pretty_name=pretty_name, fields=fields
)
[docs]
@classmethod
def from_sympy_expression(cls, expr, space_symbols, **kwds):
from hysop.symbolic.field import FieldExpressionBuilder
from hysop.tools.field_utils import print_all_names
assert "lboundaries" not in kwds
assert "rboundaries" not in kwds
assert "domain" not in kwds
# determine names if not given
if ("name" not in kwds) or (kwds["name"] is None):
(name, pretty_name, var_name, latex_name) = print_all_names(expr)
kwds["name"] = name
kwds["pretty_name"] = pretty_name
kwds["var_name"] = var_name
kwds["latex_name"] = latex_name
# determine domain and boundary conditions
fe = FieldExpressionBuilder.to_field_expression(
expr=expr, space_symbols=space_symbols, strict=True
)
kwds["domain"] = fe.domain
kwds["lboundaries"] = fe.lboundaries
kwds["rboundaries"] = fe.rboundaries
# deduce data type from field expression if not specified
kwds["dtype"] = first_not_None(kwds.get("dtype", None), fe.dtype)
# finally return create and return the ScalarField
return ScalarField(**kwds)
[docs]
def gradient(
self,
name=None,
pretty_name=None,
scalar_name_prefix=None,
scalar_pretty_name_prefix=None,
directions=None,
axis=-1,
space_symbols=None,
dtype=None,
**kwds,
):
"""
Create a field capable of storing the gradient of self,
possibly altered.
"""
dim = self.dim # dimension of the domain
ndim = self.ndim # number of dimension of the np.ndarray
frame = self.domain.frame
directions = to_tuple(first_not_None(directions, range(dim)))
space_symbols = to_tuple(first_not_None(space_symbols, frame.coords))
check_instance(
directions, tuple, minval=0, maxval=self.dim - 1, minsize=1, unique=True
)
check_instance(axis, int, minval=-ndim, maxval=ndim - 1)
check_instance(space_symbols, tuple, values=SpaceSymbol, size=dim, unique=True)
ndirs = len(directions)
if ndim > 0:
axis = (axis + ndim) % ndim
shape = self.shape[: axis + 1] + (ndirs,) + self.shape[axis + 1 :]
else:
shape = (ndirs,)
name = first_not_None(name, f"grad_{self.name}")
pretty_name = first_not_None(
pretty_name, "{}{}".format(nabla, self.pretty_name)
)
if shape == (1,):
expr = self.symbol(frame.time, *space_symbols).diff(
space_symbols[directions[0]]
)
return self.from_sympy_expression(
expr=expr,
space_symbols=space_symbols,
name=name,
pretty_name=pretty_name,
dtype=dtype,
**kwds,
)
else:
exprs = npw.empty(shape=shape, dtype=object)
for idx in npw.ndindex(*shape):
i = idx[: axis + 1] + idx[axis + 2 :]
d = directions[idx[axis + 1]]
if self.is_tensor:
exprs[idx] = (
self[i]
.symbol(frame.time, *space_symbols)
.diff(space_symbols[d])
)
else:
assert i == (), i
exprs[idx] = self.symbol(frame.time, *space_symbols).diff(
space_symbols[d]
)
return self.from_sympy_expressions(
exprs=exprs,
space_symbols=space_symbols,
name=name,
pretty_name=pretty_name,
scalar_name_prefix=scalar_name_prefix,
scalar_pretty_name_prefix=scalar_pretty_name_prefix,
dtype=dtype,
**kwds,
)
[docs]
def laplacian(
self,
name=None,
pretty_name=None,
scalar_name_prefix=None,
scalar_pretty_name_prefix=None,
dtype=None,
**kwds,
):
from hysop.symbolic.field import laplacian
frame = self.domain.frame
exprs = laplacian(self.symbol(*frame.vars), frame)
name = first_not_None(name, f"laplacian_{self.name}")
pretty_name = first_not_None(pretty_name, "Δ{}".format(self.pretty_name))
if isinstance(exprs, npw.ndarray):
if exprs.size == 1:
expr = exprs.item()
return self.from_sympy_expression(
expr=expr,
space_symbols=frame.coords,
name=name,
pretty_name=pretty_name,
dtype=dtype,
**kwds,
)
else:
return self.from_sympy_expressions(
exprs=exprs,
space_symbols=frame.coords,
name=name,
pretty_name=pretty_name,
scalar_name_prefix=scalar_name_prefix,
scalar_pretty_name_prefix=scalar_pretty_name_prefix,
dtype=dtype,
**kwds,
)
else:
expr = exprs
return self.from_sympy_expression(
expr=expr,
space_symbols=frame.coords,
name=name,
pretty_name=pretty_name,
dtype=dtype,
**kwds,
)
[docs]
def div(
self,
name=None,
pretty_name=None,
scalar_name_prefix=None,
scalar_pretty_name_prefix=None,
axis=-1,
dtype=None,
**kwds,
):
"""
Create a field capable of storing the divergence of self,
on chosen axis.
"""
from hysop.symbolic.field import div
frame = self.domain.frame
exprs = npw.asarray(div(self.symbol(*frame.vars), frame))
name = first_not_None(name, f"div_{self.name}")
pretty_name = first_not_None(
pretty_name, "{}⋅{}".format(nabla, self.pretty_name)
)
if exprs.size in (0, 1):
expr = exprs.item()
return self.from_sympy_expression(
expr=expr,
space_symbols=frame.coords,
name=name,
pretty_name=pretty_name,
dtype=dtype,
**kwds,
)
else:
return self.from_sympy_expressions(
exprs=exprs,
space_symbols=frame.coords,
name=name,
pretty_name=pretty_name,
scalar_name_prefix=scalar_name_prefix,
scalar_pretty_name_prefix=scalar_pretty_name_prefix,
dtype=dtype,
**kwds,
)
[docs]
def curl(
self,
name=None,
pretty_name=None,
scalar_name_prefix=None,
scalar_pretty_name_prefix=None,
dtype=None,
**kwds,
):
"""
Create a field capable of storing the curl of self,
Only 2D and 3D fields are supported as the curl brings
a 1-vector to a 2-vector:
- A vector to a pseudoscalar or a pseudoscalar to a vector in 2D
- A vector to a pseudovector or a pseudovector to a vector in 3D
In 1D the curl is 0, and in 4D the curl would be a 6D 'field'.
"""
from hysop.symbolic.field import curl
if self.dim == 2:
msg = "Can only take curl for a 2D field with one or two components."
assert self.nb_components in (1, 2), msg
elif self.dim == 3:
msg = "Can only take curl for a 3D field with three components."
assert self.nb_components in (3,), msg
else:
msg = "Can only take curl for a 2D or 3D vector field."
assert self.dim in (2, 3), msg
frame = self.domain.frame
exprs = curl(self.symbol(*frame.vars), frame)
name = first_not_None(name, f"curl_{self.name}")
pretty_name = first_not_None(
pretty_name, "{}∧{}".format(nabla, self.pretty_name)
)
if isinstance(exprs, npw.ndarray):
return self.from_sympy_expressions(
exprs=exprs,
space_symbols=frame.coords,
name=name,
pretty_name=pretty_name,
scalar_name_prefix=scalar_name_prefix,
scalar_pretty_name_prefix=scalar_pretty_name_prefix,
dtype=dtype,
**kwds,
)
else:
return self.from_sympy_expression(
expr=exprs,
space_symbols=frame.coords,
name=name,
pretty_name=pretty_name,
dtype=dtype,
**kwds,
)
[docs]
def rot(self, *args, **kwds):
"""See curl."""
return self.curl(*args, **kwds)
[docs]
def get_attributes(self, *attrs):
"""
Return all matching attributes contained in self.fields,
as a tuple.
"""
objects = ()
for field in self.fields:
obj = field
for attr in attrs:
obj = getattr(obj, attr)
objects += (obj,)
return objects
[docs]
def has_unique_attribute(self, *attr):
"""
Return true if all contained fields share the same attribute
(as stated by the == comparisson operator).
"""
def are_equal(a, b):
if a is b:
return True
if type(a) != type(b):
return False
if isinstance(a, (list, tuple, set, frozenset)):
for ai, bi in zip(a, b):
if not are_equal(ai, bi):
return False
return True
if isinstance(a, dict):
for k in set(a.keys()).union(b.keys()):
if (k not in a) or (k not in b):
return False
ak, bk = a[k], b[k]
if not are_equal(ak, bk):
return False
return True
if isinstance(a, npw.ndarray):
return npw.array_equal(a, b)
return a == b
objects = self.get_attributes(*attr)
obj0 = objects[0]
for obj in objects[1:]:
if not are_equal(obj0, obj):
return False
return True
[docs]
def get_unique_attribute(self, *attr):
"""
Try to return the unique attribute common to all contained fields.
Raise an AttributeError if a attribute is not unique accross contained
field views.
"""
if self.has_unique_attribute(*attr):
return self.fields[0].get_attributes(*attr)[0]
msg = "{} is not unique accross contained fields."
msg = msg.format(".".join(str(x) for x in attr))
raise AttributeError(msg)
[docs]
def has_unique_dtype(self):
"""Return true if all contained discrete fields share the same dtype."""
return self.has_unique_attribute("dtype")
[docs]
def has_unique_lboundaries(self):
"""Return true if all contained continuous fields share the same lboundaries."""
return self.has_unique_attribute("lboundaries")
[docs]
def has_unique_rboundaries(self):
"""Return true if all contained continuous fields share the same rboundaries."""
return self.has_unique_attribute("rboundaries")
[docs]
def has_unique_boundaries(self):
"""Return true if all contained continuous fields share the same boundaries."""
return self.has_unique_attribute("boundaries")
[docs]
def has_unique_lboundaries_kind(self):
"""Return true if all contained continuous fields share the same lboundaries kind."""
return self.has_unique_attribute("lboundaries_kind")
[docs]
def has_unique_rboundaries_kind(self):
"""Return true if all contained continuous fields share the same rboundaries kind."""
return self.has_unique_attribute("rboundaries_kind")
[docs]
def has_unique_boundaries_kind(self):
"""Return true if all contained continuous fields share the same boundaries kind."""
return self.has_unique_attribute("boundaries_kind")
[docs]
def has_unique_periodicity(self):
"""Return true if all contained continuous fields share the same periodicity."""
return self.has_unique_attribute("periodicity")
@property
def dtype(self):
"""
Try to return the unique data type common to all contained fields,
else raise an AttributeError.
"""
return self.get_unique_attribute("dtype")
@property
def lboundaries(self):
"""
Try to return the unique lboundaries common to all contained fields,
else raise an AttributeError.
"""
return self.get_unique_attribute("lboundaries")
@property
def rboundaries(self):
"""
Try to return the unique rboundaries common to all contained fields,
else raise an AttributeError.
"""
return self.get_unique_attribute("rboundaries")
@property
def boundaries(self):
"""
Try to return the unique boundaries common to all contained fields,
else raise an AttributeError.
"""
return self.get_unique_attribute("boundaries")
@property
def lboundaries_kind(self):
"""
Try to return the unique lboundaries common to all contained fields,
else raise an AttributeError.
"""
return self.get_unique_attribute("lboundaries_kind")
@property
def rboundaries_kind(self):
"""
Try to return the unique rboundaries common to all contained fields,
else raise an AttributeError.
"""
return self.get_unique_attribute("rboundaries_kind")
@property
def boundaries_kind(self):
"""
Try to return the unique boundaries kind common to all contained fields,
else raise an AttributeError.
"""
return self.get_unique_attribute("boundaries_kind")
@property
def periodicity(self):
"""
Try to return the unique periodicity common to all contained fields,
else raise an AttributeError.
"""
return self.get_unique_attribute("periodicity")
def __eq__(self, other):
return self is other
def __ne__(self, other):
return self is not other
def __hash__(self):
return id(self)
def _get_domain(self):
"""Return the physical domain where this field is defined."""
return self._domain
def _get_dim(self):
"""Return the dimension of the physical domain."""
return self._dim
domain = property(_get_domain)
dim = property(_get_dim)
[docs]
class ScalarField(NamedScalarContainerI, FieldContainerI):
"""
A continuous field is a scalar field defined on a physical domain.
This object handles a dictionnary of discrete fields
(from 0 to any number).
Each discrete field is uniquely defined by the topology used to
discretize it.
Example
-------
If topo1 and topo2 are two hysop.topology.cartesian_topology.Topology instances:
scal = ScalarField(domain=dom, name='Scalar')
# Discretize the field on the two topologies:
scal.discretize(topo1)
scal.discretize(topo2)
# Access the discrete fields:
scal_discr1 = scal.discrete_fields[topo1]
scal_discr2 = scal.discrete_fields[topo2]
"""
[docs]
@debug
def __new__(
cls,
domain,
name,
pretty_name=None,
var_name=None,
latex_name=None,
initial_values=None,
dtype=HYSOP_REAL,
lboundaries=None,
rboundaries=None,
is_tmp=False,
mem_tag=None,
**kwds,
):
r"""
Create or get an existing continuous ScalarField (scalar or vector) on a specific domain.
Parameters
----------
domain : domain.Domain
Physical domain where this field is defined.
name : string
A name for the field.
pretty_name: str, optional.
A pretty name used for display whenever possible.
Defaults to name.
var_name: string, optional.
A variable name used for code generation.
This will be passed to the symbolic representation of this field.
latex_name: string, optional.
A variable name used for latex generation.
This will be passed to the symbolic representation of this field.
dtype: npw.dtype, optional, defaults to HYSOP_REAL
Underlying data type of this field
initial_values: numeric value, or tuple of numeric values, optional
Fields are initialized to specified initial value everywhere in the domain
on first discretization.
The input values are cast to given dtype.
If None, leaves the memory uninitialized.
If a single value is given, the whole field is initialized to this value,
the default being None (ie. no initialization at all).
If tuple, computational mesh will be initialized with the first value,
and ghosts will be initialized with the second value.
lboundaries: array_like of BoundaryCondition or BoundaryConditionConfig, optional
Left boundary conditions, defaults to PERIODIC on each axis.
rboundaries: array_like of BoundaryCondition or BoundaryConditionConfig, optional
Right boundary conditions, defaults to PERIODIC on each axis.
is_tmp: bool
Specify that this field is a temporary continuous field.
Basically a ScalarField that yields a temporary discrete field upon discretization.
/!\ ** WARNING *******************************************/!\
TemporaryDiscreteFields are allocated during setup using
temporary work buffers. Those work buffers are only
available withing the scope of operators thats use
this temporary field during their apply method.
/!\ ***************************************************** /!\
kwds: dict
Base class keyword arguments.
Attributes
----------
boundaries: tuple of numpy.ndarray of BoundaryCondition
Left and right boundary conditions as a tuple.
periodicity: numpy.ndarray of bool
Numpy array mask, True is axis is periodic, else False.
"""
check_instance(name, str)
check_instance(pretty_name, str, allow_none=True)
check_instance(latex_name, str, allow_none=True)
check_instance(var_name, str, allow_none=True)
check_instance(is_tmp, bool)
if mem_tag is not None:
assert is_tmp, "Can only specify mem_tag for temporary fields."
# Data type of the field
if (dtype == npw.bool_) or (dtype == bool):
import warnings
msg = "Parameter dtype=npw.bool_ has been converted "
msg += f"to HYSOP_BOOL={HYSOP_BOOL.__name__}."
warnings.warn(msg, HysopWarning)
dtype = HYSOP_BOOL
dtype = npw.dtype(dtype)
# Name and pretty name
pretty_name = first_not_None(pretty_name, name)
check_instance(pretty_name, str)
# Initial values
if not isinstance(initial_values, (list, tuple)):
initial_values = (initial_values, initial_values)
assert len(initial_values) == 2
initial_values = tuple(initial_values)
check_instance(initial_values, tuple, size=2)
# Field boundary conditions
lboundaries = npw.asarray(
first_not_None(
lboundaries, cls.default_boundaries_from_domain(domain.lboundaries)
)
)
rboundaries = npw.asarray(
first_not_None(
rboundaries, cls.default_boundaries_from_domain(domain.rboundaries)
)
)
check_instance(
lboundaries,
npw.ndarray,
values=(BoundaryCondition, BoundaryConditionConfig),
ndim=1,
size=domain.dim,
dtype=object,
allow_none=True,
)
check_instance(
rboundaries,
npw.ndarray,
values=(BoundaryCondition, BoundaryConditionConfig),
ndim=1,
size=domain.dim,
dtype=object,
allow_none=True,
)
assert lboundaries.size == rboundaries.size == domain.dim
for i, (lb, rb) in enumerate(zip(lboundaries, rboundaries)):
if (lb.bc == BoundaryCondition.PERIODIC) ^ (
rb.bc == BoundaryCondition.PERIODIC
):
msg = f"Periodic BoundaryCondition mismatch on axis {i}."
raise ValueError(msg)
check_instance(
lboundaries,
npw.ndarray,
values=(BoundaryCondition, BoundaryConditionConfig),
ndim=1,
size=domain.dim,
dtype=object,
)
check_instance(
rboundaries,
npw.ndarray,
values=(BoundaryCondition, BoundaryConditionConfig),
ndim=1,
size=domain.dim,
dtype=object,
)
periodic = BoundaryCondition.PERIODIC
periodicity = np.asarray(tuple(map(lambda x: x.bc, lboundaries))) == periodic
kwds.pop("make_field", None)
obj = super().__new__(
cls,
domain=domain,
name=name,
pretty_name=pretty_name,
var_name=var_name,
latex_name=latex_name,
tag_prefix="f",
tagged_cls=ScalarField,
**kwds,
)
obj._dtype = dtype
obj._initial_values = initial_values
obj._is_tmp = is_tmp
obj._mem_tag = mem_tag
obj._lboundaries = lboundaries
obj._rboundaries = rboundaries
obj._periodicity = periodicity
# Symbolic representation of this field
from hysop.symbolic.field import SymbolicField
obj._symbol = SymbolicField(field=obj)
# Dictionnary of all the discretizations of this field.
# keys are hysop.topology.topology.Topology,
# values are hysop.fields.discrete_field.DiscreteField.
obj._discrete_fields = {}
cls.__check_vars(obj)
return obj
def __init__(
self,
domain,
name,
pretty_name=None,
var_name=None,
latex_name=None,
initial_values=None,
dtype=HYSOP_REAL,
lboundaries=None,
rboundaries=None,
is_tmp=False,
mem_tag=None,
**kwds,
):
kwds.pop("make_field", None)
super().__init__(
domain=domain,
name=name,
pretty_name=pretty_name,
var_name=var_name,
latex_name=latex_name,
tag_prefix="f",
tagged_cls=ScalarField,
**kwds,
)
[docs]
@classmethod
def default_boundaries_from_domain(cls, boundaries):
check_instance(boundaries, npw.ndarray, values=BoxBoundaryCondition)
field_boundaries = npw.empty_like(boundaries)
field_boundaries[...] = None
for i, bd in enumerate(boundaries):
if bd is BoxBoundaryCondition.PERIODIC:
fbd = BoundaryCondition.PERIODIC
elif (
bd is BoxBoundaryCondition.SYMMETRIC
): # (normal to boundary velocity = 0)
# let any advected scalar to be 0 in boundaries
fbd = BoundaryCondition.HOMOGENEOUS_DIRICHLET
elif bd is BoxBoundaryCondition.OUTFLOW: # (velocity normal to boundary)
# let any advected scalar to go trough the boundary
fbd = BoundaryCondition.HOMOGENEOUS_NEUMANN
else:
msg = "FATAL ERROR: Unknown domain boundary condition {}."
msg = msg.format(bd)
raise NotImplementedError(msg)
field_boundaries[i] = fbd
return field_boundaries
@classmethod
def __check_vars(cls, obj):
"""Check properties and types."""
check_instance(obj.dtype, npw.dtype)
check_instance(obj.domain, Domain)
check_instance(obj.name, str)
check_instance(obj.pretty_name, str)
check_instance(obj.dim, int, minval=1)
check_instance(obj.nb_components, int, minval=1)
check_instance(obj.discrete_fields, dict)
check_instance(obj.initial_values, tuple, size=2)
check_instance(
obj.lboundaries,
npw.ndarray,
values=(BoundaryCondition, BoundaryConditionConfig),
ndim=1,
size=obj.domain.dim,
dtype=object,
)
check_instance(
obj.rboundaries,
npw.ndarray,
values=(BoundaryCondition, BoundaryConditionConfig),
ndim=1,
size=obj.domain.dim,
dtype=object,
)
check_instance(
obj.periodicity, npw.ndarray, dtype=bool, ndim=1, size=obj.domain.dim
)
check_instance(obj.is_tmp, bool)
[docs]
def field_like(
self,
name,
pretty_name=None,
latex_name=None,
var_name=None,
domain=None,
dtype=None,
is_tmp=None,
lboundaries=None,
rboundaries=None,
initial_values=None,
**kwds,
):
"""Create a ScalarField like this object, possibly altered."""
check_instance(name, str)
domain = first_not_None(domain, self.domain)
dtype = first_not_None(dtype, self.dtype)
is_tmp = first_not_None(is_tmp, self.is_tmp)
lboundaries = first_not_None(lboundaries, self.lboundaries)
rboundaries = first_not_None(rboundaries, self.rboundaries)
initial_values = first_not_None(initial_values, self.initial_values)
return ScalarField(
name=name,
pretty_name=pretty_name,
var_name=var_name,
latex_name=latex_name,
domain=domain,
dtype=dtype,
is_tmp=is_tmp,
lboundaries=lboundaries,
rboundaries=rboundaries,
initial_values=initial_values,
**kwds,
)
[docs]
def tmp_like(self, name, **kwds):
"""Create a TemporaryField like self, possibly altered."""
return self.field_like(name=name, is_tmp=True, **kwds)
[docs]
def short_description(self):
"""Short description of this field."""
s = "{}[pname={}, dim={}, dtype={}, bc=[{}], iv={}]"
s = s.format(
self.full_tag,
self.name,
self.dim,
self.dtype,
self.format_boundaries(),
self.initial_values,
)
return s
[docs]
def long_description(self):
"""Long description of this field."""
s = textwrap.dedent(
"""
{}
*name: {}
*pretty_name: {}
*var_name: {}
*latex_name: {}
*dim: {}
*dtype: {}
*left boundary: {}
*right boundary: {}
*initial values: {}
*topology tags: [{}]
"""
).format(
self.full_tag,
self.name,
self.pretty_name,
self.var_name,
self.latex_name,
self.dim,
self.dtype,
self.lboundaries.tolist(),
self.rboundaries.tolist(),
self.initial_values,
",".join([k.full_tag for k in self.discrete_fields.keys()]),
)
return s[1:]
[docs]
@debug
def discretize(self, topology, topology_state=None):
"""
Discretization of the field on a given topology.
Parameters
----------
topology : :class:`hysop.mpi.core.topology.Topology`
The topology on which to discretize this ScalarField.
state: :class:`hysop.mpi.core.topology.TopologyState`, optional
The topology state on which to discretize this ScalarField.
Returns
--------
The discretized field :class:`hysop.fields.discrete_field.DiscreteField`.
Note
----
In the case the discretization already exists,
simply returns the discretized field.
"""
from hysop.fields.discrete_field import DiscreteField
topology_state = first_not_None(topology_state, topology.default_state())
check_instance(topology, Topology)
check_instance(topology_state, TopologyState)
if topology not in self.discrete_fields:
dfield = topology.discretize(self)
check_instance(dfield, DiscreteField)
if not self.is_tmp:
assert topology in self.discrete_fields
assert self.discrete_fields[topology] is dfield
else:
dfield = self.discrete_fields[topology]
return dfield.view(topology_state)
def _get_dtype(self):
"""Return the default allocation dtype of this ScalarField."""
return self._dtype
def _get_initial_values(self):
"""Return initial value of this field (compute_val, ghost_val)."""
return self._initial_values
def _get_discrete_fields(self):
"""
Return the dictionnary containing all the discretizations
of this field.
"""
return self._discrete_fields
def _get_lboundaries(self):
"""Left boundary conditions."""
return self._lboundaries
def _get_rboundaries(self):
"""Right boundary conditions."""
return self._rboundaries
def _get_lboundaries_kind(self):
"""Left boundary condition kind."""
return np.asarray(tuple(map(lambda x: x.bc, self._lboundaries)))
def _get_rboundaries_kind(self):
"""Right boundary condition kind."""
return np.asarray(tuple(map(lambda x: x.bc, self._rboundaries)))
def _get_boundaries(self):
"""Left and right boundary conditions as a tuple."""
return (self._lboundaries, self._rboundaries)
def _get_boundaries_kind(self):
"""Left and right boundary condition kind as a tuple."""
return (self.lboundaries_kind, self._get_lboundaries_kind)
def _get_periodicity(self):
"""Numpy array mask, True is axis is periodic, else False."""
return self._periodicity
def _get_is_tmp(self):
"""Is this ScalarField a temporary field ?"""
return self._is_tmp
def _get_mem_tag(self):
return self._mem_tag
dtype = property(_get_dtype)
initial_values = property(_get_initial_values)
discrete_fields = property(_get_discrete_fields)
lboundaries = property(_get_lboundaries)
rboundaries = property(_get_rboundaries)
boundaries = property(_get_boundaries)
lboundaries_kind = property(_get_lboundaries_kind)
rboundaries_kind = property(_get_rboundaries_kind)
boundaries_kind = property(_get_boundaries_kind)
periodicity = property(_get_periodicity)
is_tmp = property(_get_is_tmp)
mem_tag = property(_get_mem_tag)
@property
def is_tensor(self):
return False
@property
def fields(self):
return (self,)
@property
def nb_components(self):
return 1
def __str__(self):
return self.long_description()
def __eq__(self, other):
return self is other
def __ne__(self, other):
return self is not other
def __hash__(self):
return id(self)
[docs]
class TensorField(NamedTensorContainerI, FieldContainerI):
"""
A continuous tensor field is a collection of scalar fields
defined on a physical domain, organized as a multi-dimensional array..
This object handles a numpy.ndarray of continuous scalar fields,
which may have different attributes (different data types for
example). It is mainly a view on scalar fields with the FieldContainerI interface.
A tensor field garanties that each different field objects have
unique names and pretty names within the tensor field. A given
continuous scalar field may appear in at multiple indices (to allow
symetric tensor for example). Some components may also be set to None
(to allow upper triangular matrices for example). Is also garanties
that all fields shares the same domain.
"""
@property
def is_tensor(self):
return True
def __new__(
cls,
domain,
name,
shape,
pretty_name=None,
name_formatter=None,
pretty_name_formatter=None,
skip_field=None,
make_field=None,
fields=None,
base_kwds=None,
**kwds,
):
pretty_name = first_not_None(pretty_name, name)
check_instance(name, str)
check_instance(pretty_name, str)
check_instance(shape, tuple, values=int)
if (len(shape) == 1) and not issubclass(cls, VectorField):
obj = VectorField(
domain=domain,
shape=shape,
name=name,
name_formatter=name_formatter,
pretty_name=pretty_name,
pretty_name_formatter=pretty_name_formatter,
skip_field=skip_field,
make_field=make_field,
fields=fields,
base_kwds=base_kwds,
**kwds,
)
return obj
name_formatter = first_not_None(name_formatter, cls.default_name_formatter)
pretty_name_formatter = first_not_None(
pretty_name_formatter, cls.default_pretty_name_formatter
)
skip_field = first_not_None(skip_field, cls.default_skip_field)
make_field = first_not_None(make_field, cls.default_make_field)
base_kwds = first_not_None(base_kwds, {})
check_instance(domain, Domain)
if npw.prod(shape) <= 0:
msg = "Invalid shape for a tensor-like field, got {}."
msg = msg.format(shape)
raise ValueError(msg)
if fields is None:
fields = ()
for idx in npw.ndindex(*shape):
if skip_field(idx):
field = None
else:
fname = name_formatter(basename=name, idx=idx)
pfname = pretty_name_formatter(basename=pretty_name, idx=idx)
field = make_field(
idx, domain=domain, name=fname, pretty_name=pfname, **kwds
)
fields += (field,)
cls._check_fields(*fields)
fields = npw.asarray(fields, dtype=object).reshape(shape)
else:
assert not kwds
check_instance(fields, npw.ndarray, dtype=object)
assert npw.array_equal(fields.shape, shape)
obj = super().__new__(
cls,
domain=domain,
name=name,
pretty_name=pretty_name,
tag_prefix="tf",
tagged_cls=TensorField,
contained_objects=fields,
**base_kwds,
)
obj._fields = fields
obj._name_formatter = name_formatter
obj._pretty_name_formatter = pretty_name_formatter
obj._skip_field = skip_field
from hysop.symbolic.field import SymbolicFieldTensor
obj._symbol = SymbolicFieldTensor(field=obj)
obj._check_domains(domain)
obj._check_names()
return obj
def __init__(
self,
domain,
name,
shape,
pretty_name=None,
name_formatter=None,
pretty_name_formatter=None,
skip_field=None,
make_field=None,
fields=None,
base_kwds=None,
**kwds,
):
base_kwds = first_not_None(base_kwds, {})
super().__init__(
domain=domain,
name=name,
pretty_name=pretty_name,
tag_prefix="tf",
tagged_cls=TensorField,
contained_objects=fields,
**base_kwds,
)
[docs]
def discretize(self, topology, topology_state=None):
from hysop.fields.discrete_field import DiscreteTensorField
dfields = npw.empty(shape=self.shape, dtype=object)
for idx, field in self.nd_iter():
dfields[idx] = field.discretize(
topology=topology, topology_state=topology_state
)
return DiscreteTensorField(field=self, dfields=dfields)
[docs]
@classmethod
def from_fields(cls, name, fields, shape, pretty_name=None, **kwds):
"""Create a TensorField from a list of fields and a shape."""
fields = to_tuple(fields)
shape = to_tuple(shape)
check_instance(fields, tuple, values=(ScalarField,), minsize=1)
check_instance(shape, tuple, values=int)
check_instance(name, str)
check_instance(pretty_name, str, allow_none=True)
cls._check_fields(*fields)
field0 = first_not_None(*fields)
domain = field0.domain
fields = npw.asarray(fields, dtype=object).reshape(shape)
return Field(
domain=domain,
name=name,
shape=shape,
pretty_name=pretty_name,
fields=fields,
**kwds,
)
[docs]
@classmethod
def from_field_array(cls, name, fields, pretty_name=None, **kwds):
"""Create a TensorField from numpy.ndarray of fields."""
assert fields.size > 1
check_instance(name, str)
check_instance(pretty_name, str, allow_none=True)
check_instance(fields, npw.ndarray, dtype=object, values=ScalarField)
shape = fields.shape
_fields = tuple(fields.ravel().tolist())
cls._check_fields(*_fields)
field0 = first_not_None(*_fields)
domain = field0.domain
return Field(
domain=domain,
name=name,
pretty_name=pretty_name,
shape=shape,
fields=fields,
**kwds,
)
@classmethod
def _check_fields(cls, *fields):
"""Check that at least one field is specified."""
field0 = first_not_None(*fields)
if field0 is None:
msg = "Tensor field {} should at least contain a valid ScalarField."
msg = msg.format(field0.name)
raise ValueError(msg)
[docs]
@classmethod
def default_make_field(cls, idx, **kwds):
return ScalarField(**kwds)
[docs]
@classmethod
def default_skip_field(cls, idx):
return False
def _check_domains(self, domain):
"""Check that fields share a unique domain."""
for field in self:
check_instance(field, ScalarField)
if field.domain.domain is not domain:
msg = f"Domain mismatch for field {field.name}."
raise ValueError(msg)
def _check_names(self):
"""Check that fields names are unique."""
names = {}
pnames = {}
for field in self:
name = field.name
pname = field.pretty_name
if (name in names) and (names[name] is not field):
msg = "Name {} was already used by another field."
msg = msg.format(name)
raise ValueError(msg)
if (pname in pnames) and (pnames[pname] is not field):
msg = "Name {} was already used by another field."
msg = msg.format(pname)
raise ValueError(msg)
names[name] = field
pnames[name] = field
@property
def fields(self):
"""Return all unique scalar fields contained in this field-like interface."""
ordered_fields = self._fields.ravel().tolist()
fields = set(ordered_fields)
fields.discard(None)
# keep field ordering unlike using a set
unique_fields = ()
for field in ordered_fields:
if (field in fields) and (field not in unique_fields):
unique_fields += (field,)
return unique_fields
@property
def nb_components(self):
return len(self.fields)
[docs]
def short_description(self):
"""Short description of this tensor field."""
s = "{}[name={}, pname={}, dim={}, shape={}]"
s = s.format(self.full_tag, self.name, self.pretty_name, self.dim, self.shape)
return s
[docs]
def long_description(self):
"""Long description of this tensor field as a string."""
s = textwrap.dedent(
"""
{}
*name: {}
*pretty_name: {}
*dim: {}
*shape: {}
*nb_components: {}
*symbolic repr.:
""".format(
self.full_tag,
self.name,
self.pretty_name,
self.dim,
self.shape,
self.nb_components,
)
)[1:]
s += " " + "\n ".join(str(self.symbol).split("\n"))
return s
[docs]
def field_like(
self,
name,
pretty_name=None,
shape=None,
nb_components=None,
fn="field_like",
**kwds,
):
"""Create a TensorField like this object, possibly altered."""
if (shape is None) and (nb_components is not None):
shape = (nb_components,)
del nb_components
shape = first_not_None(shape, self.shape)
nb_components = npw.prod(shape, dtype=npw.int64)
pretty_name = first_not_None(pretty_name, name)
check_instance(name, str)
check_instance(pretty_name, str)
if nb_components == 1:
return getattr(self.fields[0], fn)(
name=name, pretty_name=pretty_name, **kwds
)
else:
fields = npw.empty(shape=shape, dtype=object)
if self.shape == shape:
for idx, field in self.nd_iter():
fname = self._name_formatter(basename=name, idx=idx)
pfname = self._pretty_name_formatter(basename=pretty_name, idx=idx)
fields[idx] = getattr(field, fn)(
name=fname, pretty_name=pfname, **kwds
)
else:
field = self.fields[0]
for idx in npw.ndindex(*shape):
fname = self._name_formatter(basename=name, idx=idx)
pfname = self._pretty_name_formatter(basename=pretty_name, idx=idx)
fields[idx] = getattr(field, fn)(
name=fname, pretty_name=pfname, **kwds
)
return self.from_field_array(
name=name, pretty_name=pretty_name, fields=fields
)
[docs]
def tmp_like(self, name, **kwds):
"""Create a temporary field like self, possibly altered."""
return self.field_like(name=name, fn="tmp_like", **kwds)
def __getitem__(self, slc):
fields = self._fields.__getitem__(slc)
if isinstance(fields, ScalarField):
return fields
else:
name = f"{self.name}_{str(slc)}"
pretty_name = f"{self.pretty_name}_{str(slc)}"
return self.from_field_array(
name=name, pretty_name=pretty_name, fields=fields
)
[docs]
class VectorField(TensorField):
"""Specialization for vector fields, ie. a TensorField with ndim=1."""
def __new__(cls, domain, name, nb_components=None, shape=None, **kwds):
assert (nb_components is None) ^ (shape is None)
if shape is None:
check_instance(nb_components, int, minval=1)
shape = (nb_components,)
check_instance(shape, tuple, values=int, size=1)
if shape[0] == 1:
return ScalarField(domain=domain, name=name, **kwds)
obj = super().__new__(cls, domain=domain, name=name, shape=shape, **kwds)
return obj
def __init__(self, domain, name, nb_components=None, shape=None, **kwds):
super().__init__(domain=domain, name=name, shape=shape, **kwds)
Field = FieldContainerI
"""A Field is just a alias of FieldContainerI"""